[sender] refactor for a simpler multi-thread behavior#209
[sender] refactor for a simpler multi-thread behavior#209
Conversation
ivoanjo
left a comment
There was a problem hiding this comment.
I very much like this! I like how simpler it looks once is_closed becomes a terminal state: once the instance gets there, there's no going back.
I also very much like how the mutex usage is kept off the main path of the code.
| class FlushQueue < Queue | ||
| end | ||
| class CloseQueue < Queue | ||
| end |
There was a problem hiding this comment.
Since these aren't used elsewhere, I suggest putting them inside the Sender class itself.
Also (very minor) if you want a single-line definition, you can use:
FlushQueue = Class.new(Queue) # OR
class FlushQueue < Queue; end| blocking_queue = FlushQueue.new | ||
| channel << blocking_queue | ||
| blocking_queue.pop # wait for the bg thread to finish its work | ||
| blocking_queue.close if CLOSEABLE_QUEUES |
There was a problem hiding this comment.
To be honest, I'm growing increasingly unconvinced this whole business with the CLOSEABLE_QUEUES is worth it. It effectively creates two code paths for different Ruby versions, but it's not like we're going to drop support for the old Rubies soon (#close is a Ruby 2.3 feature, and we're still fighting to drop 2.0).
Would it be simpler to just remove this entirely? It doesn't even seem that it would particularly improve performance either.
| @sender_thread = Thread.new(&method(:send_loop)) | ||
| end | ||
|
|
||
| def rendez_vous |
There was a problem hiding this comment.
The forwarder will probably need to be updated to not use #rendez_vous anymore, right? (To be honest, I don't quite understand the use-case of having both #sync_with_outbound_io and #flush at the top-level).
| # Initialize and get the thread's sync queue | ||
| queue = (Thread.current[:statsd_sync_queue] ||= Queue.new) |
There was a problem hiding this comment.
We lost this caching-the-queue behavior in the refactoring, which doesn't seem like an issue, but just doublechecking by asking if this is ok from a performance pov (I actually am not sure how expensive it is to create queues, probably not a lot)
| # Compatibility with `Sender` | ||
| def start() | ||
| end |
There was a problem hiding this comment.
With this change, neither Sender nor SingleThreadedSender use #start, so perhaps it would make sense to just remove them?
| channel << blocking_queue | ||
| blocking_queue.pop # wait for the bg thread to finish its work | ||
| blocking_queue.close if CLOSEABLE_QUEUES | ||
| sender_thread.join(3) # wait for completion, timeout after 3 seconds | ||
| # TODO(remy): should I close `channel` here? |
There was a problem hiding this comment.
As I suggested above, I think it'd be simpler to just not use close; if we do, calling close here may be problematic if two stops get called concurrently, but the background thread is taking long to finish. E.g. something like T1: acquire mutex -> tell background thread to stop -> timeout join -> call close -> release mutex; T2: acquire mutex -> does not see previous @is_closed -> tries to write to channel -> channel has been closed.
There was a problem hiding this comment.
Also, should the stop behavior be a bit more flexible? E.g. configurable timeout, or optionally run a block to decide what to do.
DRAFT
Idea behind this PR is to improved the
Senderobjects lifecycles (especially themessage_queueand thesender_thread) in order to have a simpler implementation not having to always check for their existence.On top of that, this PR is synchronizing the
#stop/close mechanism for it to be blocking. The worse scenario with multiple calls to close/add in parallel would now be that metrics submitted after a close call would not be flushed.